package com.yxsk.relay.job.admin.core.collector;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.yxsk.relay.job.admin.core.schedule.JobTriggerInfo;
import com.yxsk.relay.job.admin.core.schedule.RelayJobScheduleRegistrar;
import com.yxsk.relay.job.admin.core.schedule.constant.JobDataConstant;
import com.yxsk.relay.job.admin.core.schedule.quartz.vo.JobConfig;
import com.yxsk.relay.job.admin.data.entity.JobCollectLog;
import com.yxsk.relay.job.admin.data.entity.JobDetails;
import com.yxsk.relay.job.admin.data.entity.JobExecuteLog;
import com.yxsk.relay.job.admin.data.entity.JobTriggerLog;
import com.yxsk.relay.job.admin.data.service.JobCollectLogService;
import com.yxsk.relay.job.admin.data.service.JobDetailService;
import com.yxsk.relay.job.admin.data.service.JobExecuteLogService;
import com.yxsk.relay.job.admin.data.service.JobTriggerLogService;
import com.yxsk.relay.job.admin.exception.job.JobResultCollectException;
import com.yxsk.relay.job.component.admin.utils.SpringBeanUtils;
import com.yxsk.relay.job.component.common.constant.NetProtocol;
import com.yxsk.relay.job.component.common.constant.UriConstant;
import com.yxsk.relay.job.component.common.exception.remote.RemoteCallException;
import com.yxsk.relay.job.component.common.protocol.caller.RemoteCaller;
import com.yxsk.relay.job.component.common.protocol.message.base.ResultResponse;
import com.yxsk.relay.job.component.common.protocol.message.collect.ExecuteResultCollectRequest;
import com.yxsk.relay.job.component.common.protocol.message.collect.ExecuteResultCollectResponse;
import com.yxsk.relay.job.component.common.utils.DateUtils;
import com.yxsk.relay.job.component.common.utils.ExceptionUtils;
import com.yxsk.relay.job.component.common.utils.SerialNoUtils;
import com.yxsk.relay.job.component.common.vo.Endpoint;
import com.yxsk.relay.job.component.common.vo.EndpointExecuteResult;
import com.yxsk.relay.job.component.common.vo.ExecuteResult;
import com.yxsk.relay.job.component.common.vo.TriggerResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Author 11376
 * @CreaTime 2019/8/5 22:57
 * @Description
 */
@Slf4j
public class QuartzRemoteJobResultCollector extends RemoteJobResultCollector {

    @Override
    public void collect(JobTriggerInfo triggerInfo) {
        // 查询所有执行的任务
        JobExecuteLogService logService = SpringBeanUtils.getBean(JobExecuteLogService.class);

        List<JobExecuteLog> logList = logService.findByTriggerId(triggerInfo.getTriggerId());
        if (!CollectionUtils.isEmpty(logList)) {
            // 集群任务结果计算
            List<EndpointExecuteResult> executeResults = new ArrayList<>(logList.size() * 4 / 3);

            logList.stream().forEach(jobExecuteLog -> {
                EndpointExecuteResult executeResult = new EndpointExecuteResult();
                Endpoint endpoint = new Endpoint();
                endpoint.setHost(jobExecuteLog.getEndpointIp());
                endpoint.setAuthToken(jobExecuteLog.getAuthToken());
                endpoint.setPort(jobExecuteLog.getEndpointPort());
                endpoint.setProtocol(NetProtocol.getProtocol(jobExecuteLog.getNetProtocol()));

                executeResult.setEndpoint(endpoint);
                executeResult.setExecuteParam(jobExecuteLog.getExecuteParam());
                executeResult.setResult(jobExecuteLog.getExecuteResult());
                executeResult.setErrorMsg(jobExecuteLog.getExecuteErrorMessage());
                executeResult.setSuccess(ExecuteResult.ExecuteResultCode.OK.getCode().equals(jobExecuteLog.getExecuteStatus()));

                executeResults.add(executeResult);
            });

            // 组装请求报文
            ExecuteResultCollectRequest collectRequest = new ExecuteResultCollectRequest();
            collectRequest.setExecuteResults(executeResults);
            collectRequest.setHandleName(SpringBeanUtils.getBean(JobDetailService.class).findById(triggerInfo.getJobId()).getHandlerName());

            // 获取路由节点
            // TODO 暂时由执行结果中的主机完成
            for (JobExecuteLog executeLog : logList) {
                collectRequest.setRequestTime(DateUtils.getCurrentDate());
                collectRequest.setSerialNo(SerialNoUtils.nextId());
                collectRequest.setVersion(UriConstant.VERSION_NO);

                // 请求主机
                RemoteCaller remoteCaller = this.remoteCaller(NetProtocol.getProtocol(executeLog.getNetProtocol()));
                String uri = remoteCaller.buildUri(executeLog.getEndpointIp(), executeLog.getEndpointPort(), UriConstant.TASK_RESULT_COLLECT_URI);

                Map<String, String> header = null;
                if (StringUtils.hasLength(executeLog.getAuthToken())) {
                    header = new HashMap<>(2);
                    header.put(UriConstant.AUTHORIZATION_HEADER_KEY, executeLog.getAuthToken());
                }

                try {
                    ResultResponse<ExecuteResultCollectResponse> resultResponse = remoteCaller.call(collectRequest, header, uri, new TypeReference<ResultResponse<ExecuteResultCollectResponse>>() {
                    });
                    if (ResultResponse.isOk(resultResponse)) {
                        // 处理成功
                        log.info("Cluster job result handle success.");

                        String result = resultResponse.getData().getResult();
                        this.saveCollectLog(collectRequest, triggerInfo,
                                executeLog.getEndpointIp(), executeLog.getEndpointPort(), executeLog.getAuthToken(),
                                NetProtocol.getProtocol(executeLog.getNetProtocol()), result, null, true);

                        // 触发子任务
                        handleChildrenJob(triggerInfo, result);
                        return;
                    }

                    String errorMsg = MessageFormat.format("collect error, response code: {0}, response message: {1}", resultResponse.getCode(), resultResponse.getMessage());
                    this.saveCollectLog(collectRequest, triggerInfo,
                            executeLog.getEndpointIp(), executeLog.getEndpointPort(), executeLog.getAuthToken(),
                            NetProtocol.getProtocol(executeLog.getNetProtocol()), null, new JobResultCollectException(errorMsg), true);

                } catch (RemoteCallException e) {
                    log.error("Handle cluster result error.", e);

                    this.saveCollectLog(collectRequest, triggerInfo,
                            executeLog.getEndpointIp(), executeLog.getEndpointPort(), executeLog.getAuthToken(),
                            NetProtocol.getProtocol(executeLog.getNetProtocol()), null, e, true);
                }
            }

            // 更新触发任务状态
            JobTriggerLogService triggerLogService = SpringBeanUtils.getBean(JobTriggerLogService.class);
            JobTriggerLog triggerLog = triggerLogService.findById(triggerInfo.getTriggerId());
            triggerLog.setTriggerResultCode(TriggerResult.TriggerResultCode.RESULT_COLLECT_ERROR.getCode());
            triggerLogService.updateById(triggerLog);
        }
    }


    private void saveCollectLog(ExecuteResultCollectRequest collectRequest, JobTriggerInfo triggerInfo, String endpointHost, Integer port, String authToken, NetProtocol netProtocol, String result, Throwable e, boolean success) {
        JobCollectLog collectLog = new JobCollectLog();
        collectLog.setId(collectRequest.getSerialNo());
        collectLog.setEndpointIp(endpointHost);
        collectLog.setEndpointPort(port);
        collectLog.setAuthToken(authToken);
        collectLog.setCollectParam(JSONObject.toJSONString(collectRequest));
        collectLog.setBeginTime(collectRequest.getRequestTime());
        collectLog.setEndTime(DateUtils.getCurrentDate());
        collectLog.setNetProtocol(netProtocol.getProtocol());
        collectLog.setTriggerLogId(triggerInfo.getTriggerId());

        if (e != null) {
            collectLog.setErrorMessage(ExceptionUtils.getStackInfo(e));
        }
        collectLog.setResult(result);

        JobCollectLog.CollectStatusEnum status = JobCollectLog.CollectStatusEnum.FAILED;
        if (success) {
            status = JobCollectLog.CollectStatusEnum.SUCCESS;
        }

        collectLog.setCollectStatus(status.getStatus());

        // 保存
        SpringBeanUtils.getBean(JobCollectLogService.class).addEntity(collectLog);
    }

    protected void handleChildrenJob(JobTriggerInfo triggerInfo, String result) {
        // 任务执行成功，启动后置任务
        JobDetailService detailService = SpringBeanUtils.getBean(JobDetailService.class);
        List<JobDetails> children = detailService.getChildrenJob(triggerInfo.getJobId());
        if (!CollectionUtils.isEmpty(children)) {
            RelayJobScheduleRegistrar scheduleRegistrar = SpringBeanUtils.getBean(RelayJobScheduleRegistrar.class);
            children.stream().forEach(jobDetail -> {
                if (JobConfig.JobStatus.NORMAL.getStatus().equals(jobDetail.getStatus())) {
                    // 传递父级触发任务id
                    Map<String, Object> paramMap = new HashMap<>(2);
                    // 将执行结果传递到下一个任务
                    paramMap.put(JobDataConstant.CUSTOM_TRIGGER_PARAM_KEY, result);

                    // 集群模式需等待所有任务执行完成之后出发子任务
                    scheduleRegistrar.runNow(detailService.convert(jobDetail), paramMap);
                }
            });
        }
    }

}
